Stream API 下

Collector 收集

收集器用来将经过筛选、映射的流进行最后的整理,可以使得最后的结果以不同的形式展现。
collect 方法即为收集器,它接收 Collector 接口的实现作为具体收集器的收集方法。
Collector 接口提供了很多默认实现的方法,我们可以直接使用它们格式化流的结果;也可以自定义 Collector 接口的实现,从而定制自己的收集器。

归约

流由一个个元素组成,归约就是将一个个元素“折叠”成一个值,如求和、求最值、求平均值都是归约操作。

一般性归约

若你需要自定义一个归约操作,那么需要使用 Collectors.reducing 函数,该函数接收三个参数:

  • 第一个参数为归约的初始值
  • 第二个参数为归约操作进行的字段
  • 第三个参数为归约操作的过程

汇总

Collectors类专门为汇总提供了一个工厂方法:Collectors.summingInt
它可接受一 个把对象映射为求和所需int的函数,并返回一个收集器;该收集器在传递给普通的 collect 方法后即执行我们需要的汇总操作。

分组

数据分组是一种更自然的分割数据操作,分组就是将流中的元素按照指定类别进行划分,类似于SQL语句中的 GROUPBY

多级分组

多级分组可以支持在完成一次分组后,分别对每个小组再进行分组。
使用具有两个参数的 groupingBy 重载方法即可实现多级分组。

  • 第一个参数:一级分组的条件
  • 第二个参数:一个新的 groupingBy 函数,该函数包含二级分组的条件

Collectors 类的静态工厂方法

工厂方法返回类型用途示例
toListList<T>把流中所有项目收集到一个 ListList<Project> projects = projectStream.collect(toList());
toSetSet<T>把流中所有项目收集到一个 Set,删除重复项Set<Project> projects = projectStream.collect(toSet());
toCollectionCollection<T>把流中所有项目收集到给定的供应源创建的集合Collection<Project> projects = projectStream.collect(toCollection(), ArrayList::new);
countingLong计算流中元素的个数long howManyProjects = projectStream.collect(counting());
summingIntInteger对流中项目的一个整数属性求和int totalStars = projectStream.collect(summingInt(Project::getStars));
averagingIntDouble计算流中项目 Integer 属性的平均值double avgStars = projectStream.collect(averagingInt(Project::getStars));
summarizingIntIntSummaryStatistics收集关于流中项目 Integer 属性的统计值,例如最大、最小、 总和与平均值IntSummaryStatistics projectStatistics = projectStream.collect(summarizingInt(Project::getStars));
joiningString连接对流中每个项目调用 toString 方法所生成的字符串String shortProject = projectStream.map(Project::getName).collect(joining(", "));
maxByOptional<T>按照给定比较器选出的最大元素的 Optional, 或如果流为空则为 Optional.empty()Optional<Project> fattest = projectStream.collect(maxBy(comparingInt(Project::getStars)));
minByOptional<T>按照给定比较器选出的最小元素的 Optional, 或如果流为空则为 Optional.empty()Optional<Project> fattest = projectStream.collect(minBy(comparingInt(Project::getStars)));
reducing归约操作产生的类型从一个作为累加器的初始值开始,利用 BinaryOperator 与流中的元素逐个结合,从而将流归约为单个值int totalStars = projectStream.collect(reducing(0, Project::getStars, Integer::sum));
collectingAndThen转换函数返回的类型包含另一个收集器,对其结果应用转换函数int howManyProjects = projectStream.collect(collectingAndThen(toList(), List::size));
groupingByMap<K, List<T>>根据项目的一个属性的值对流中的项目作问组,并将属性值作 为结果 Map 的键Map<String,List<Project>> projectByLanguage = projectStream.collect(groupingBy(Project::getLanguage));
partitioningByMap<Boolean,List<T>>根据对流中每个项目应用断言的结果来对项目进行分区Map<Boolean,List<Project>> vegetarianDishes = projectStream.collect(partitioningBy(Project::isVegetarian));

转换类型

有一些收集器可以生成其他集合。比如前面已经见过的 toList,生成了 java.util.List 类的实例。
还有 toSettoCollection,分别生成 SetCollection 类的实例。
到目前为止, 我已经讲了很多流上的链式操作,但总有一些时候,需要最终生成一个集合——比如:

  • 已有代码是为集合编写的,因此需要将流转换成集合传入;
  • 在集合上进行一系列链式操作后,最终希望生成一个值;
  • 写单元测试时,需要对某个具体的集合做断言。

使用 toCollection,用定制的集合收集元素

stream.collect(toCollection(TreeSet::new));

还可以利用收集器让流生成一个值。 maxByminBy 允许用户按某种特定的顺序生成一个值。

数据分区

分区是分组的特殊情况:由一个断言(返回一个布尔值的函数)作为分类函数,它称分区函数。
分区函数返回一个布尔值,这意味着得到的分组 Map 的键类型是 Boolean,于是它最多可以分为两组: true是一组,false是一组。

分区的好处在于保留了分区函数返回true或false的两套流元素列表。

并行流

并行流就是一个把内容分成多个数据块,并用不不同的线程分别处理每个数据块的流。最后合并每个数据块的计算结果。

将一个顺序执行的流转变成一个并发的流只要调用 parallel() 方法

public static long parallelSum(long n){
    return Stream.iterate(1L, i -> i +1).limit(n).parallel().reduce(0L,Long::sum);
}

将一个并发流转成顺序的流只要调用 sequential() 方法

stream.parallel().filter(...).sequential().map(...).parallel().reduce();

这两个方法可以多次调用,只有最后一个调用决定这个流是顺序的还是并发的。

并发流使用的默认线程数等于你机器的处理器核心数。

通过这个方法可以修改这个值,这是全局属性。

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");

并非使用多线程并行流处理数据的性能一定高于单线程顺序流的性能,因为性能受到多种因素的影响。
如何高效使用并发流的一些建议:

  1. 如果不确定, 就自己测试。
  2. 尽量使用基本类型的流 IntStream, LongStream, DoubleStream
  3. 有些操作使用并发流的性能会比顺序流的性能更差,比如limit,findFirst,依赖元素顺序的操作在并发流中是极其消耗性能的。findAny的性能就会好很多,应为不依赖顺序。
  4. 考虑流中计算的性能(Q)和操作的性能(N)的对比, Q表示单个处理所需的时间,N表示需要处理的数量,如果Q的值越大, 使用并发流的性能就会越高。
  5. 数据量不大时使用并发流,性能得不到提升。
  6. 考虑数据结构:并发流需要对数据进行分解,不同的数据结构被分解的性能时不一样的。

流的数据源和可分解性

可分解性
ArrayList非常好
LinkedList
IntStream.range非常好
Stream.iterate
HashSet
TreeSet

流的特性以及中间操作对流的修改都会对数据对分解性能造成影响。 比如固定大小的流在任务分解的时候就可以平均分配,但是如果有filter操作,那么流就不能预先知道在这个操作后还会剩余多少元素。

考虑终端操作的性能:如果终端操作在合并并发流的计算结果时的性能消耗太大,那么使用并发流提升的性能就会得不偿失。